# -*- coding: utf-8 -*-
# @Time    : 2021/11/13 20:04
# @Author  : travel2040
# @Email   : travel2040@163.com
# @File    : pymongo-读取mongodb数据发送到kafka.py
# 需要安装的库
# pip install pymongo
# pip install kafka

import pymongo
from kafka import KafkaProducer
import json

mongo_url = "127.0.0.1:27017"
mongo_user = 'user'
mongo_pass = 'user123'
kafka_servers = ["192.168.0.1:9092", "192.168.0.2:9092"]


if __name__ == '__main__':

    mongo_client = pymongo.MongoClient("mongodb://%s" % mongo_url)
    db_client = mongo_client.admin
    db_client.authenticate(mongo_user, mongo_pass)

    # 查询到mongo的数据
    data = mongo_client["db_name"]["tb_name"].find()

    kfk_producer = KafkaProducer(bootstrap_servers=kafka_servers)



    for d in data:
        del d["_id"]
        try:
            texts = json.dumps(d)
            # texts = texts.replace('\\\\', ' ')
            texts = texts.replace('\n', ' ').replace('\r', '').replace('\\\\', '')
            texts_enc = texts.encode('utf-8')

            kfk_producer.send('kfk_topic_name', texts_enc)
            kfk_producer.flush()

        except Exception as e:
            print("kfk_producer.send error：%s" % str(e))

